package com.jie.flink.cdc.flinksink;

import com.jie.flink.cdc.doman.DataChangeInfo;
import com.jie.flink.cdc.util.JsonUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.kafka.common.serialization.StringSerializer;

import java.nio.charset.StandardCharsets;

/**
 * @author zhanggj
 * @apiNote
 * @date 2023/9/7 10:45
 * @desc
 */
public class KafkaKeySerializer extends StringSerializer {
    @Override
    public byte[] serialize(String topic, String data) {
        if (StringUtils.isBlank(data)) {
            return null;
        }
        // 设置kafka的key
        final DataChangeInfo dataChangeInfo = JsonUtils.stringToObject(data, DataChangeInfo.class);
        return dataChangeInfo.getDatabase().concat(".")
                .concat(StringUtils.isBlank(dataChangeInfo.getSchema()) ? "" : dataChangeInfo.getSchema())
                .concat(".").concat(dataChangeInfo.getTableName()).getBytes(StandardCharsets.UTF_8);
    }
}
